 1.Kafka源码剖析之KafkaRequestHandlerPool
   
   KafkaRequestHandlerPool的作用是创建numThreads个KafkaRequestHandler实例，使用numThreads个线程
启动KafkaRequestHandler。
   每个KafkaRequestHandler包含了id，brokerId，线程数，请求的channel，处理请求的api等信息。
   只要该类进行实例化，就执行创建KafkaRequestHandler实例并启动的逻辑。
/**
* @param brokerId
* @param requestChannel
* @param apis 处理具体请求和响应的api
* @param time
* @param numThreads 运行KafkaRequestHandler的线程数
*/
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int) extends Logging with KafkaMetricsGroup
{
// 创建包含numThreads个元素的数组
   val runnables = new Array[KafkaRequestHandler](numThreads)
// 循环numThreads次，初始化KafkaRequestHandler实例numThreads个
   for(i <- 0 until numThreads) {
// 赋值：每个KafkaRequestHandler中包含了KafkaRequestHandler的id，brokerId，线程数，请求
// 的channel，处理请求的api等。
     runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter,
numThreads, requestChannel, apis, time)
// 启动这些KafkaRequestHandler线程⽤于请求的处理
     KafkaThread.daemon("kafka-request-handler-" + i, runnables(i)).start()
    }
}
    KafkaThread的start方法即是调用Thread的start方法,而start执行run方法,即此处执行的是KafkaThread的run方法：

def run() {
  while(true) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
     val startSelectTime = time.nanoseconds
// 获取请求
     val req = requestChannel.receiveRequest(300)
     val endTime = time.nanoseconds
     val idleTime = endTime - startSelectTime
     aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
     
	 req match {
       case RequestChannel.ShutdownRequest =>
         debug(s"Kafka request handler $id on broker $brokerId received shut down
command")
         latch.countDown()
         return
       case request: RequestChannel.Request =>
         try {
           request.requestDequeueTimeNanos = endTime
           trace(s"Kafka request handler $id on broker $brokerId handling request
$request")
// 对于其他请求，直接交给apis来负责处理。
           apis.handle(request)
       } catch {
         case e: FatalExitError =>
           latch.countDown()
           Exit.exit(e.statusCode)
         case e: Throwable => error("Exception when handling request", e)
       } finally {
           request.releaseBuffer()
       }
      case null => // continue
     }
   }
 }
    该类包含了关闭KafkaRequestHandler的方法：
	具体的方法：
	首先发送停止的请求,等待用户请求处理的结束latch.await() 
	优雅停机
	将请求直接放到requestQueue中。
	其中处理ShutdownRequest的处理逻辑：